Skip to main content

2、Work queues

http://previous.rabbitmq.com/v3_5_7/tutorials/tutorial-two-php.html

设置 确保消息永不丢失

设置第四个参数来打开它们basic_consume 为 false (true表示没有ACK),

$ channel-> basic_consume('task_queue'''falsefalsefalsefalse,$ callback);

设置 可持久化

我们需要确保RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的。为此,我们将第三个参数传递给queue_declare为true:

$ channel-> queue_declare('task_queue'falsetruefalsefalse;

确信即使RabbitMQ重新启动,task_queue队列也不会丢失

将消息标记为持久性

通过设置delivery_mode = 2消息属性,AMQPMessage将其作为属性数组的一部分。

$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # make message persistent
);

 new_task.php file:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

//第三个参数为true时,为可持久化
$channel->queue_declare('task_queue', false, true, false, false);

//以允许从命令行发送任意消息
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}

//要将消息标记为持久性 - 通过设置delivery_mode = 2消息属性,AMQPMessage将其作为属性数组的一部分。
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();
?>

 worker.php:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

//第三个参数为true为可持久化
$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

//它需要为消息体中的每个点伪造一秒钟的工作。它将从队列中弹出消息并执行任务
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//在处理并确认前一个消息之前,不要向工作人员发送新消息。
$channel->basic_qos(null, 1, null);
/**
* 为了确保消息永不丢失,RabbitMQ支持消息确认。从消费者发回ack(nowledgement)以告知RabbitMQ已收到,处理了特定消息,并且RabbitMQ可以自由删除它。
* 默认情况下,消息确认已关闭。现在是时候通过设置第四个参数来打开它们basic_consume到假 (true表示没有ACK),并从工作人员发送适当的确认,一旦我们有一个任务来完成。
**/
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();
?>